package com.tea.modules.admin;

import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author jaymin
 * @since 2021/12/23 2:10
 */
public class AdminDemo {
    public final static String TOPIC_NAME = "my_topic";

    /**
     * 设置adminClient
     *
     * @return
     */
    public static AdminClient adminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

    public static void createTopic() {
        AdminClient adminClient = adminClient();
        // 副本因子
        short replicationFactor = Short.parseShort("1");
        NewTopic topic = new NewTopic(TOPIC_NAME, 1, replicationFactor);
        CreateTopicsResult topics = adminClient.createTopics(Lists.newArrayList(topic));
        System.out.println("-------------------------");
        System.err.println(topics.values());
    }

    public static void topicLists() throws ExecutionException, InterruptedException {
        AdminClient adminClient = AdminDemo.adminClient();
//        ListTopicsResult listTopicsResult = adminClient.listTopics();
        ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
        // 查看internal选项
        listTopicsOptions.listInternal(true);
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        // name
        System.out.println("list:" + listTopicsResult.names().get());
        // topicListings
        topicListings.forEach(System.out::println);
    }


    /**
     * 描述topic <br>
     * test_topic:(name=test_topic, internal=false, partitions=(partition=0, leader=172.18.0.1:9095 (id: 2 rack: null),
     * replicas=172.18.0.1:9095 (id: 2 rack: null), 172.18.0.1:9096 (id: 3 rack: null), 172.18.0.1:9094 (id: 1 rack: null),
     * isr=172.18.0.1:9094 (id: 1 rack: null),
     * 172.18.0.1:9096 (id: 3 rack: null), 172.18.0.1:9095 (id: 2 rack: null)))
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void describeTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Lists.newArrayList(TOPIC_NAME));
        describeTopicsResult.all().get().forEach((d, s) -> System.out.println(d + ":" + s));
    }

    /**
     * 删除topic
     */
    public static void delTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Lists.newArrayList(TOPIC_NAME));
        System.out.println(deleteTopicsResult.all().get());
    }

    /**
     * ConfigResource{type=TOPIC, name='my_topic'}:
     * Config(entries=[ConfigEntry(name=compression.type, value=producer, isDefault=false, isSensitive=false, isReadOnly=false),
     * 描述topic配置
     */
    public static void describeConfig() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Lists.newArrayList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        configResourceConfigMap.forEach((k, v) -> System.out.println(k + ":" + v));
    }

    /**
     * 增加分区
     */
    public static void incrPartition() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        Map<String,NewPartitions> partitionsMap = new HashMap<>();
        partitionsMap.put(TOPIC_NAME, NewPartitions.increaseTo(2));
        adminClient.createPartitions(partitionsMap);
        describeTopics();
    }


    public static void modifyConfig() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        Map<ConfigResource,Config> configResourceConfigMap = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        Config config = new Config(Lists.newArrayList(new ConfigEntry("preallocate", "true")));
        configResourceConfigMap.put(configResource,config);
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configResourceConfigMap);
        System.out.println(alterConfigsResult.all().get());
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        soutConfig();
        createTopic();
//        topicLists();
//        describeTopics();
//        delTopics();
//        describeConfig();
//        modifyConfig();
        incrPartition();
    }

    private static void soutConfig() {
        AdminClient adminClient = AdminDemo.adminClient();
        System.out.println(adminClient.toString());
    }
}
